package com.parser.sixoo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * HBASE 工具类
 *
 * <li>Description: HBASE 工具类</li>
 * <li>@author: zhongzhi</li>
 * <li>@date 2017-09-07 11:43:19</li>
 */

public class HbaseStoreMain implements Serializable {

    /**
     * 日志记录
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseStoreMain.class);
    public static final LinkedBlockingQueue<Put> queue = new LinkedBlockingQueue<Put>(1000);


    /**
     * Hbase 连接对象
     */
    private static Connection CONN;
    private static Properties props = new Properties();

    public static void loadProp(String name) throws Exception {
        String CONF_DIR = System.getProperty("PROJECT_CONF_DIR");
        try {
            if (CONF_DIR == null || CONF_DIR.isEmpty()) {
                props.load(RedisStoreMain.class.getClassLoader().getResourceAsStream(name));
            } else {
                String path = CONF_DIR + File.separator + name;
                props.load(new FileInputStream(path));
            }
        } catch (Exception e) {
            throw new Exception("加载" + name + "配置文件失败！conf_dir=" + CONF_DIR, e);
        }
    }

    public static void main(String[] args) throws Exception {

        Table table = getConnection().getTable(TableName.valueOf("test1"));
        loadProp("redis.properties");
//        ThreadPoolExecutor service = new ThreadPoolExecutor(50, 80, 1000000,
//                TimeUnit.SECONDS,
//                new ArrayBlockingQueue<Runnable>(1000),
//                new RejectedExecutionHandler() {
//                    @Override
//                    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
//                        try {
//                            threadPoolExecutor.getQueue().put(runnable);
//                        } catch (InterruptedException e) {
//                            LOGGER.error("重新加入任务队列！", e);
//                        }
//                    }
//                });

        for (int i = 0; i < Integer.valueOf(props.getProperty("thread.num")); i++) {
            Thread task = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            long t = System.currentTimeMillis();
                            Put put = queue.poll();
                            if (put != null) {
                                table.put(put);
                                System.out.println(Thread.currentThread() + "," + queue.size() + ",耗时：" + (System.currentTimeMillis() - t) / 1000);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }
            });
            task.start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                long t = System.currentTimeMillis();
                for (int k = 0; k < 60000; k++) {
                    Put put = new Put(Bytes.toBytes(props.getProperty("rowkey.prefix") + t));
                    for (int i = 0; i < 30000; i++) {
                        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("p" + i), Bytes.toBytes(i));
                    }
                    put.setDurability(Durability.SKIP_WAL);
                    boolean b_in = false;
                    while (!b_in) {
                        try {
                            b_in = queue.offer(put, 10, TimeUnit.SECONDS);
                            System.out.println(Thread.currentThread() + "-------------------------" + queue.size());
                            b_in = true;
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                System.out.println("数据准备完成");
            }
        }).start();


    }

    /**
     * 获取Hbase的连接
     *
     * @return Hbase connection
     * @throws Exception the exception
     */
    public synchronized static Connection getConnection() throws Exception {
        if (null == CONN || CONN.isClosed()) {
            try {
                Configuration conf = HBaseConfiguration.create();
                CONN = ConnectionFactory.createConnection(conf);
            } catch (Exception e) {
                LOGGER.error("不能建立HBASE连接", e);
                throw new Exception("不能建立HBASE连接", e);
            }
        }

        return CONN;
    }

    /**
     * 创建命名空间
     *
     * @param namespace 命名空间
     * @throws Exception Exception
     */
    public static void createNamespace(String namespace) throws Exception {
        Admin admin = null;
        try {
            admin = getConnection().getAdmin();
            if (namespaceIsExists(admin, namespace)) {
                LOGGER.warn("The namespace " + namespace + "  already exists !");
                return;
            }
            admin.createNamespace(NamespaceDescriptor.create(namespace).build());
            LOGGER.info("create namespace " + namespace + " seccuss.");
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 判断表是否存在
     *
     * @param tableName tableName
     * @return true:存在, false:不存在
     * @throws Exception Exception
     */
    public static boolean tableExists(String tableName) throws Exception, IOException {
        Admin admin = null;
        try {
            admin = getConnection().getAdmin();
            return admin.tableExists(TableName.valueOf(tableName));
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 创建一个表，这个表没有任何region
     *
     * @param tableName 表名
     * @param cfs       列族
     * @throws Exception Exception
     */
    public static void createTable(String tableName, String... cfs) throws Exception {
        Admin admin = null;
        try {
            admin = getConnection().getAdmin();
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
            for (String family : cfs) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family);
                hColumnDescriptor.setCompressionType(Compression.Algorithm.SNAPPY);
                hTableDescriptor.addFamily(hColumnDescriptor);
                hColumnDescriptor.setMaxVersions(3);
            }
            admin.createTable(hTableDescriptor);
            LOGGER.info("创建表成功 " + tableName + "  seccuss.");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 清空表数据, 保留分区
     *
     * @param tableName 表名
     * @throws Exception Exception
     */
    public static void truncateTable(String tableName) throws Exception {
        Admin admin = null;
        TableName tableNameObj = TableName.valueOf(tableName);
        try {
            admin = getConnection().getAdmin();
            if (!admin.tableExists(tableNameObj)) {
                LOGGER.error("The table " + tableName + "  does not exists!");
                return;
            }
            admin.disableTable(tableNameObj);
            admin.truncateTable(tableNameObj, true);
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 获取hbase表中的列族字段
     *
     * @param tableNames 表名
     * @return 列族字段集合 family fields
     * @throws Exception Exception
     */
    public static List<String> getFamilyFields(String tableNames) throws Exception {
        Admin admin = null;
        List<String> families = new LinkedList<>();
        try {
            admin = getConnection().getAdmin();
            HTableDescriptor hTableDesc = admin.getTableDescriptor(TableName.valueOf(tableNames));
            hTableDesc.getFamilies().forEach(desc -> families.add(desc.getNameAsString()));
            return families;
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 追加新的列族
     *
     * @param tableName tableName
     * @param families  families
     * @throws Exception Exception
     */
    public static void addColumnFamily(String tableName, String... families) throws Exception {
        Admin admin = null;
        try {
            admin = getConnection().getAdmin();
            for (String family : families) {
                HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
                admin.addColumn(TableName.valueOf(tableName), columnDescriptor);
            }
        } finally {
            closeAdmin(admin);
        }
    }


    /**
     * 批量插入数据
     *
     * @param tableName tableName
     * @param puts      List<Put>
     * @throws Exception Exception
     */
    public static boolean batchPuts(final String tableName, List<Put> puts)
            throws Exception {
        Table table = null;
        try {
            table = getConnection().getTable(TableName.valueOf(tableName));
            table.put(puts);
        } catch (Exception e) {
            LOGGER.error("批量存储数据失败！ ", e);
            throw new Exception("批量存储数据失败！", e);
        } finally {
            closeTable(table);
        }

        return true;
    }

    /**
     * 异步往指定表添加数据
     *
     * @param tablename 表名
     * @param puts      需要添加的数据
     */
    public static boolean asyncPuts(String tablename, List<Put> puts) throws Exception {
        Connection conn = getConnection();
        final BufferedMutator.ExceptionListener listener = (e, mutator) -> {
            for (int i = 0; i < e.getNumExceptions(); i++) {
                LOGGER.error("Failed to sent put " + e.getRow(i) + ".");
            }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                .listener(listener);
        params.writeBufferSize(5 * 1024 * 1024);

        final BufferedMutator mutator = conn.getBufferedMutator(params);
        try {
            mutator.mutate(puts);
            mutator.flush();
        } finally {
            mutator.close();
        }

        return true;
    }

    /**
     * 判断命名空间是否存在
     *
     * @param admin     Admin
     * @param namespace 命名空间
     * @return true:存在、false:不存在
     * @throws Exception Exception
     */
    private static boolean namespaceIsExists(Admin admin, String namespace) throws Exception {
        NamespaceDescriptor[] namespaceDescs = admin.listNamespaceDescriptors();
        List<String> ns = new LinkedList<>();
        Arrays.stream(namespaceDescs).forEach(namespaceDesc -> ns.add(namespaceDesc.getName()));

        return ns.contains(namespace);
    }

    /**
     * 启用表, 若表状态为disable使其状态变为enable
     *
     * @param tableName 表名
     * @throws Exception Exception
     */
    private static void enableTable(String tableName) throws Exception {
        // 若表是disable状态, 则启用表
        Admin admin = getConnection().getAdmin();
        if (admin.isTableAvailable(TableName.valueOf(tableName))) {
            LOGGER.info("The table " + tableName + " is available !");
            return;
        }
        admin.enableTable(TableName.valueOf(tableName));
        LOGGER.info("enable talbe " + tableName + " seccuss.");
    }

    /**
     * 刷新表空间
     *
     * @param tableName tableName
     * @throws Exception Exception
     */
    public static void flushTable(String tableName) throws Exception {
        Admin admin = null;
        try {
            admin = getConnection().getAdmin();
            admin.flush(TableName.valueOf(tableName));
        } catch (Exception e) {
            throw new Exception(e);
        } finally {
            closeAdmin(admin);
        }
    }

    /**
     * 关闭hbase表管理对象（DDL）的Admin
     *
     * @param admin hbase表管理对象
     */
    public static void closeAdmin(Admin admin) {
        if (null != admin) {
            try {
                admin.close();
            } catch (IOException e) {
                LOGGER.error("close connection failure !", e);
            }
        }
    }

    /**
     * 关闭table
     *
     * @param table 表对象
     */
    public static void closeTable(Table table) {
        if (null != table) {
            try {
                table.close();
            } catch (IOException e) {
                LOGGER.error("close table failure !", e);
            }
        }
    }

    /**
     * 关闭hbase连接
     */
    public static void closeConn() {
        if (null != CONN) {
            try {
                CONN.close();
            } catch (IOException e) {
                LOGGER.error("close connection failure !", e);
            }
        }
    }
}
